#!/usr/bin/env python
# coding: utf-8

# In[1]:


exec(open('init_path.py').read())
exec((P_Lib/'GasStation.py').read_text())
from io import BytesIO
from zipfile import ZipFile
get_ipython().run_line_magic('matplotlib', 'inline')


# ### Weather Data

# In[7]:


def build_table_from_webpage(url):
    ls_filenames = extract_all_links_in_url(url)
    df = DataFrame(ls_filenames, columns=['Filename'])
    df = df[df.Filename.str.contains('10minuten')]
    df['URL'] = url + df.Filename
    s = df.Filename.str.split('_')
    df['Type'] = s.str[1]
    df['StID'] = s.str[2]
    df['Start'] = s.str[3]
    df['End'] = s.str[4]
    return df


# In[8]:


url_temp = 'https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/10_minutes/air_temperature/historical/'
url_rain = 'https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/10_minutes/precipitation/historical/'
df_temp = build_table_from_webpage(url_temp)
df_rain = build_table_from_webpage(url_rain)
df = df_temp.append(df_rain)
df['Start'] = df.Start.astype(int)
df['End'] = df.End.astype(int)
df = df[~((df.End<=20160101)|(df.Start>=20190101))]
df.head(2)


# In[9]:


gb = df.groupby(['Type','StID'])
df_station = concat([gb.URL.apply(list), gb.Start.min(), gb.End.max()], axis=1).reset_index()
df_station.head(2)


# In[10]:


# remove stations that do not have 16,17,18
df_station = df_station[(df_station.Start<=20160101) & (df_station.End>=20181231)]
print(df_station.head(2))
print(df_station.Type.value_counts())


# In[11]:


def download_file_in_memory(url):
    resp = urlopen(url)
    return BytesIO(resp.read())

def extract_single_file_zip_in_memory(data_zipped):
    zipfile = ZipFile(data_zipped)
    return zipfile.open(zipfile.namelist()[0])

def download_extract_info(url, cols):
    data_zipped = download_file_in_memory(url)
    data = extract_single_file_zip_in_memory(data_zipped)
    return read_table(data, delimiter=';')[cols]


# In[21]:


# for each row, download zipfile, extract it, and process (merge, select) & save it
# Temperature
cols = ['STATIONS_ID','MESS_DATUM','TT_10']
ls_URL = flatten_list(df_station.loc[df_station.Type=='TU', 'URL'].tolist())
print(len(ls_URL))
df = concat([download_extract_info(url, cols) for url in ls_URL]) # download all urls
df.columns = ['StID','YMDHM','Temp']
df = df[(df.YMDHM>=201601010000) & (df.YMDHM<201901010000)].copy()


# In[32]:


df.head(2)


# In[19]:


# save to file
df = df.drop_duplicates(['StID','YMDHM'])
df.to_parquet(P_GS_Data_Raw / 'Weather' / 'temp_2016_2018.parquet', compression='brotli', index=False)


# In[16]:


# Rain - RWS_10 (Rain amount)
cols = ['STATIONS_ID','MESS_DATUM','RWS_10']
ls_URL = flatten_list(df_station.loc[df_station.Type=='nieder', 'URL'].tolist())
print(len(ls_URL))

ls = []
for i, url in enumerate(ls_URL):
    print(i,end=',')
    df = download_extract_info(url, cols)
    df.columns = ['StID','YMDHM','Rain']
    df = df[(df.YMDHM>=201601010000) & (df.YMDHM<201901010000)].copy()
    ls.append(df)
df = concat(ls)

# save to file
df = df.drop_duplicates(['StID','YMDHM'])
df.to_parquet(P_GS_Data_Raw / 'Weather' / 'rain_2016_2018.parquet', compression='brotli', index=False)


# ### Build Data Dictionary

# In[2]:


df = read_parquet(P_GS_Data_Raw / 'Weather' / (weather_type+'_2016_2018.parquet'))
df.head(2)


# In[ ]:


date = DataFrame(sorted(df.YMDHM.unique()),columns=['intYMDHM'])
date['dtYMDHM'] = to_datetime(date.intYMDHM.astype(str))
s = date.dtYMDHM.dt
date['intYM'] = s.year*100 + s.month
date['intH'] = s.hour
date = date.set_index('intYMDHM')
date.head(2)


# In[ ]:


date.to_parquet(P_GS_Data_Raw / 'Weather' / 'date.parquet', compression='brotli')


# In[54]:


date = read_parquet(P_GS_Data_Raw / 'Weather' / 'date.parquet')
date.head(2)


# ### Define Weather Shocks

# In[55]:


weather_type = 'temp'


# In[56]:


df = read_parquet(P_GS_Data_Raw / 'Weather' / (weather_type+'_2016_2018.parquet'))
df = df[df[weather_type.capitalize()]!=-999].copy()
# only stations with full coverage from 2016 to 2018
gb = df.groupby('StID')
minDate, maxDate = gb.YMDHM.min(), gb.YMDHM.max()
ls = minDate[(minDate<=201601010000) & (maxDate>=201812310000)].index.tolist()
df = df[df.StID.isin(ls)].copy()


# In[57]:


df['intH'] = df.YMDHM.map(date.intH.to_dict())
df = df[(df.intH>=h_min) & (df.intH<=h_max-1)].copy()
df = df.sort_values(['StID','YMDHM'])
df.head(2)


# In[58]:


df['Chg'] = df[weather_type.capitalize()] - df.groupby('StID')[weather_type.capitalize()].shift(1)
df['ChgAbs'] = df.Chg.abs()
df['intYM'] = df.YMDHM.map(date.intYM.to_dict())
df.head(2)


# In[59]:


shock_thld = concat([df[df.Chg<0].groupby(['StID','intYM']).ChgAbs.quantile(0.9), df[df.Chg>0].groupby(['StID','intYM']).ChgAbs.quantile(0.9), df.groupby(['StID','intYM']).ChgAbs.quantile(0.9)], axis=1)
shock_thld.columns = ['ThldNeg','ThldPos','ThldAny']
shock_thld = shock_thld.reset_index()
df = merge(df, shock_thld)
df['ShockNeg'] = ((df.Chg<0) & (df.ChgAbs>=df.ThldNeg)).astype(int)
df['ShockPos'] = ((df.Chg>0) & (df.ChgAbs>=df.ThldPos)).astype(int)
df['ShockAny'] = (df.ChgAbs>=df.ThldAny).astype(int)
df.head(2)


# In[60]:


s = df.loc[df[['ShockNeg','ShockPos','ShockAny']].sum(axis=1)>=1, ['StID','YMDHM','ShockNeg','ShockPos','ShockAny']].copy()


# In[61]:


s.to_parquet(P_GS_Data_Raw / 'Weather' / (weather_type+'_shocks.parquet'), compression='brotli', index=False)


# ### Weather Station Cleanup

# In[62]:


weather_type = 'temp'


# In[63]:


shock = read_parquet(P_GS_Data_Raw / 'Weather' / (weather_type+'_shocks.parquet'))
shock.head(2)


# In[64]:


f = P_GS_Data_Raw / 'Weather' / 'StationMeta' / (weather_type+'.txt')
station = read_fwf(f, widths=[5,9,9,20,7,10,30,30], encoding='cp1250').loc[1:]
station = station.rename(columns={'Stati':'StID','he geoB':'Lat','reite geoL':'Lon'})[['StID','Lat','Lon']]
station['StID'] = station.StID.astype(int)
station[['Lat','Lon']] = station[['Lat','Lon']].astype(float)
station.head(2)


# In[65]:


# only select stations with data that satisfies selection, e.g., 16-18
print(station.shape)
station = station[station.StID.isin(shock.StID.unique().tolist())].copy()
print(station.shape)
print(shock.StID.nunique())


# In[66]:


station.to_parquet(f.with_suffix('.parquet'), compression='brotli', index=False)


# ### Match GSID to WSID

# In[67]:


weather_type = 'temp'


# In[68]:


f = P_GS_Data_Raw / 'Weather' / 'StationMeta' / (weather_type+'.parquet')
ws = read_parquet(f)
dict_wsid_to_lat = ws.set_index('StID').Lat.to_dict()
dict_wsid_to_lon = ws.set_index('StID').Lon.to_dict()


# In[69]:


GS = read_hdf(f_GS, 'GS')
GS = GS[GS.Lat.notnull()]
dict_gsid_to_lat = GS.set_index('StID').Lat.to_dict()
dict_gsid_to_lon = GS.set_index('StID').Lng.to_dict()


# In[70]:


ls = list(itertools.product(GS.StID.tolist(), ws.StID.tolist()))
df = DataFrame(ls, columns = ['GSID','WSID'])
df['GSLat'] = df.GSID.map(dict_gsid_to_lat)
df['GSLon'] = df.GSID.map(dict_gsid_to_lon)
df['WSLat'] = df.WSID.map(dict_wsid_to_lat)
df['WSLon'] = df.WSID.map(dict_wsid_to_lon)
df.head(2)


# In[71]:


from geog import distance
df['Direct'] = distance(df[['GSLon','GSLat']].values, df[['WSLon','WSLat']].values)
df.head(2)


# In[72]:


# for each GSID, get the one with the shortest distance
df = df.sort_values(['GSID','Direct']).groupby('GSID').first()
dict_gsid_to_wsid = df.WSID.to_dict()


# In[73]:


f = P_GS_Data / 'Shock_Weather' / ('dict_gsid_to_wsid_'+weather_type+'.pkl')
save_obj(dict_gsid_to_wsid, f)


# In[ ]:




